package com.sanji.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by peter on 17-3-18.
 */
public class NativeConsumer {
  public static void main(String[] args) {
    System.out.println("begin consumer");
    connectionKafka();
    System.out.println("finish consumer");
  }

  @SuppressWarnings("resource")
  public static void connectionKafka() {

    Properties props = new Properties();
    props.put("auto.offset.reset","earliest");
    props.put("bootstrap.servers", "192.168.2.44:32772");
    props.put("group.id", "newConsumer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "100");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    //指定可以从多个topic中消费
    consumer.subscribe(Arrays.asList("topic"));
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);

      for (ConsumerRecord<String, String> record : records) {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value() + ";");
      }
    }
  }
}
